-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(backend): Add Semaphore and Mutex fields to Workflow CR #11370
base: master
Are you sure you want to change the base?
Conversation
5c420db
to
449cdda
Compare
add |
We should edit the kubeflow manifest to deploy a skeleton of this configmap. You can do that in here or in a follow-up PR. |
I would probably delete this line (and maybe edit the PR title), because that reads like things you enhanced on Workflow itself. We're just setting fields on it... |
What does Argo Workflows do when both are set? A better verification would be to do two separate test pipelines -- one where you use mutex, and one where you use semaphore. And then in addition to verifying the Workflow yaml, also verify that multiple runs are being locked like they should be. |
449cdda
to
497bc6b
Compare
/lgtm |
497bc6b
to
ccc5fd0
Compare
New changes are detected. LGTM label has been removed. |
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Update: Added Semaphore ConfigMap name environment variable to APIServer deployment manifest, in order to provide users flexibility to set a different CM name. |
85d4fe3
to
b060e03
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good -- just a couple small things left :)
@@ -28,6 +29,7 @@ import ( | |||
"google.golang.org/protobuf/proto" | |||
"google.golang.org/protobuf/types/known/structpb" | |||
k8score "k8s.io/api/core/v1" | |||
v1 "k8s.io/api/core/v1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same import is just above this one :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, removed this import and accordingly also updated the remaining code to use the k8score
import instead.
@@ -40,6 +42,16 @@ type Options struct { | |||
// optional | |||
PipelineRoot string | |||
// TODO(Bobgy): add an option -- dev mode, ImagePullPolicy should only be Always in dev mode. | |||
SemaphoreKey string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love using this Options struct for these fields, but I'm not sure what an alternative would be. I guess it's fine for now and maybe we can figure out a cleaner way down the line.
@@ -119,6 +130,28 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S | |||
Entrypoint: tmplEntrypoint, | |||
}, | |||
} | |||
|
|||
if semaphoreKey != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for this intermediate variable and the block above where they're set. Just use opts.SemaphoreKey
if semaphoreKey != "" { | |
if opts != nil && opts.SemaphoreKey != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, updated to remove the variable initialization block and used opts.SemaphoreKey
and opts.MutexName
directly instead.
@@ -0,0 +1,5 @@ | |||
kind: ConfigMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this lgtm, but I just had a thought. What happens if:
- I install kfp and I get this configmap created
- I customize it by adding keys to it
- I upgrade to the next version of kfp
Is that down the line upgrade going to overwrite my customized configmap with this blank one? It'd be cool if we could test that somehow -- perhaps by manually creating the configmap on a cluster, putting data in it, and then installing kfp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
To resolve this, I've added a job to the configmap, semaphore-configmap-init
that runs during the Kustomize deployment process.
It checks for the existence of the semaphore-config ConfigMap using kubectl get.
If the ConfigMap does not exist, it creates one with an empty init key.
If the ConfigMap already exists, the Job skips creation and exits successfully.
- Added `Semaphore` and `Mutex` fields to the Workflow Spec to support concurrency control mechanisms directly within workflows. - Introduced a new environment variable, `SEMAPHORE_CONFIGMAP_NAME`, to the API Server deployment for managing semaphore configurations. - Added an empty ConfigMap manifest for semaphores to facilitate initial setup and testing. Signed-off-by: ddalvi <[email protected]>
b060e03
to
b5e29b3
Compare
lgtm need to rebase when #11384 is merged, and then I'll tag it |
Resolves #6553
Description of your changes:
This PR introduces support for Pipeline-level
Semaphores
andMutexes
in the KFP backend.Changes Introduced:
Added the ability to specify a semaphore for pipelines, which controls the number of concurrent instances of a pipeline that can run. The semaphore is configured via a fixed ConfigMap named
semaphore-config
. The semaphore key is provided through the pipeline configuration.Added mutex support for pipelines, ensuring that only one instance of the pipeline can run at a time if the specified mutex is defined. Mutex names are defined per pipeline, and each pipeline instance respects the specified mutex.
The Workflow CR now includes a
Synchronization
field, wheresemaphore
andmutex
are appropriately set.If a pipeline has a semaphore, the backend maps the semaphore to the
semaphore-config
ConfigMap using the key provided by the user. Mutexes are represented by their name, ensuring mutual exclusion.This PR should be merged only after #11340 gets merged.
Testing instructions
Build the API Server image and push to an image registry
Upload main.yaml file from here
Check in KFP UI Pipeline Spec tab if the following snippet is present:
Scenarios and Argo Workflows UI Verification
Scenario 1: Only Semaphore
Update the pipeline configuration to include only
semaphoreKey
.Trigger multiple runs at the same time; the number of runs should be greater than the
semaphore
value we've set.Check the Argo Workflows UI for the following message for the latest workflows:
Waiting for kubeflow/ConfigMap/semaphore-config/semaphore lock. Lock status: 0/X
Verify the Workflow CR:
Scenario 2: Only Mutex
Update the pipeline configuration to include only
mutexName
.Trigger more than one runs
Check the Argo Workflows UI for the following message on the latest workflows:
Waiting for kubeflow/Mutex/mutex lock. Lock status: 0/X
Verify the Workflow CR:
Scenario 3: Both Semaphore and Mutex
semaphoreKey
andmutexName
.Checklist: